Skip to content

add system defined tags and free form labels to datasets#1553

Open
nikhilsinhaparseable wants to merge 11 commits intoparseablehq:mainfrom
nikhilsinhaparseable:dataset-tags-lables
Open

add system defined tags and free form labels to datasets#1553
nikhilsinhaparseable wants to merge 11 commits intoparseablehq:mainfrom
nikhilsinhaparseable:dataset-tags-lables

Conversation

@nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Feb 21, 2026

  • PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags
  • and X-P-Dataset-Labels headers (comma-separated) on stream creation
  • PUT /api/prism/v1/datasets/{name}- create or update dataset tags and labels(empty list clears all)
  • GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels
  • GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag
  • include tags and labels in home api response

Summary by CodeRabbit

  • New Features

    • New dataset HTTP endpoints: fetch correlated datasets, fetch by tag, and update dataset tags/labels; routes include appropriate auth checks.
  • User-facing Data Model

    • Streams and APIs now support multiple dataset tags and labels (lists). HTTP headers accept multiple tags/labels; PUT returns updated tags/labels.
  • Refactor

    • Tenant-scoped dataset operations, deduplication of tags/labels, and synchronized updates between persistent storage and in-memory state.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 21, 2026

Walkthrough

Replaces single optional dataset_tag with dataset_tags: Vec<DatasetTag> and dataset_labels: Vec<String> across metadata, storage, parseable APIs, headers, and call sites; adds dataset HTTP endpoints and an ObjectStorage API to update tags/labels.

Changes

Cohort / File(s) Summary
Model & Metadata
src/metadata.rs, src/storage/mod.rs, src/prism/home/mod.rs
Replaced dataset_tag: Option<...> with dataset_tags: Vec<...> and dataset_labels: Vec<String> in metadata, ObjectStoreFormat, StreamInfo, and public Home/DataSet responses (serde updated).
Parseable & Stream APIs
src/parseable/mod.rs, src/parseable/streams.rs, src/connectors/kafka/processor.rs, src/storage/field_stats.rs
Updated create_stream_if_not_exists/create_stream signatures to accept dataset_tags/dataset_labels; stream getters/setters pluralized; call sites updated to pass vec![] where None was used.
HTTP Handlers & Ingest
src/handlers/http/mod.rs, src/handlers/http/ingest.rs, src/handlers/http/prism_logstream.rs
Exported datasets module; propagated dataset vectors through ingest/OTEL setup and ensured tenant_id passed into dataset retrieval.
Datasets API & Routing
src/handlers/http/datasets.rs, src/handlers/http/modal/server.rs
Added new endpoints: GET /correlated/{name}, GET /tags/{tag}, PUT /{name} with RBAC checks; registered routes under /datasets.
Headers & Utilities
src/handlers/http/modal/utils/logstream_utils.rs, src/handlers/mod.rs
PutStreamHeaders now holds dataset_tags: Vec<DatasetTag> and dataset_labels: Vec<String>; added DATASET_TAGS_KEY/DATASET_LABELS_KEY, parsing helpers, dedupe, and logging for invalid entries; DatasetTag extended (Hash, new variants).
Object Storage API
src/storage/object_storage.rs
Added update_dataset_tags_and_labels_in_stream(...) to ObjectStorage trait to persist updated tags/labels.
Migration & Misc
src/migration/mod.rs, src/prism/logstream/mod.rs
Migration parsing updated to populate vectors; PrismDatasetRequest::get_datasets now accepts explicit tenant_id parameter and threads it through listing/processing.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HTTP_API as HTTP API
    participant Auth as RBAC
    participant Parseable
    participant ObjectStore as ObjectStorage/Metastore
    participant Storage as PersistedStore

    Client->>HTTP_API: PUT /api/v1/datasets/{name} {tags, labels}
    HTTP_API->>Auth: authorize Action::CreateStream
    Auth-->>HTTP_API: allowed/denied
    HTTP_API->>Parseable: validate & ensure stream loaded (tenant_id)
    Parseable->>ObjectStore: update_dataset_tags_and_labels_in_stream(name, tags, labels, tenant_id)
    ObjectStore->>Storage: get_stream_json(name, tenant_id)
    Storage-->>ObjectStore: stream_json
    ObjectStore->>Storage: put_stream_json(updated_json)
    Storage-->>ObjectStore: ack
    ObjectStore-->>Parseable: success
    Parseable-->>HTTP_API: return updated tags/labels
    HTTP_API-->>Client: 200 OK / error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • parmesant
  • praveen5959

Poem

🐇 I nibbled tags into tidy rows,
Headers woke up, and metadata grows.
I hopped through routes and storage gates,
Shared labels danced with correlated mates.
Streams now wear bouquets — hooray, it shows!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description provides a clear bulleted list of all new API endpoints and features, directly addressing the PR objectives. However, the provided template requires sections for testing confirmation, code comments, and documentation, which are not present in the description. Add checklist items confirming testing of log ingestion/query, comments explaining code intent, and documentation for the new dataset tags/labels features and API endpoints.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly and clearly summarizes the main change: introducing system-defined tags and free-form labels for datasets, which is the primary objective across all file modifications.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

480-489: Consider a builder pattern for stream creation parameters.

The #[allow(clippy::too_many_arguments)] annotation is a reasonable workaround, but with 7+ parameters, this function is becoming unwieldy. For a future improvement, consider introducing a StreamCreationParams struct or builder pattern to group these related parameters.

This is a minor suggestion and doesn't block the current PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/mod.rs` around lines 480 - 489, The function
create_stream_if_not_exists has many parameters; refactor by introducing a
StreamCreationParams struct (or a builder StreamCreationParamsBuilder) to group
stream_name, stream_type, custom_partition, log_source, telemetry_type,
dataset_tags, and dataset_labels; update create_stream_if_not_exists signature
to accept a single StreamCreationParams (or builder output) and adapt internal
usage accordingly, add a conversion constructor or builder methods for easy
construction where callers currently pass those seven+ args, and preserve
existing behavior and validation inside the new struct or builder.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/parseable/mod.rs`:
- Around line 480-489: The function create_stream_if_not_exists has many
parameters; refactor by introducing a StreamCreationParams struct (or a builder
StreamCreationParamsBuilder) to group stream_name, stream_type,
custom_partition, log_source, telemetry_type, dataset_tags, and dataset_labels;
update create_stream_if_not_exists signature to accept a single
StreamCreationParams (or builder output) and adapt internal usage accordingly,
add a conversion constructor or builder methods for easy construction where
callers currently pass those seven+ args, and preserve existing behavior and
validation inside the new struct or builder.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 21, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

480-527: Consider bundling stream creation parameters into a struct.

The function now accepts 8 parameters (including self), which triggers the clippy::too_many_arguments lint. While the #[allow] attribute suppresses it, this pattern repeats across create_stream_if_not_exists, create_stream, and create_update_stream.

A dedicated struct (e.g., StreamCreationParams or StreamConfig) would:

  • Improve readability at call sites
  • Make future parameter additions easier
  • Provide a natural place for default values
💡 Example struct-based approach
pub struct StreamCreationParams {
    pub stream_type: StreamType,
    pub custom_partition: Option<String>,
    pub log_source: Vec<LogSourceEntry>,
    pub telemetry_type: TelemetryType,
    pub dataset_tags: Vec<DatasetTag>,
    pub dataset_labels: Vec<String>,
}

impl Default for StreamCreationParams {
    fn default() -> Self {
        Self {
            stream_type: StreamType::UserDefined,
            custom_partition: None,
            log_source: vec![],
            telemetry_type: TelemetryType::Logs,
            dataset_tags: vec![],
            dataset_labels: vec![],
        }
    }
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/mod.rs` around lines 480 - 527, Create a StreamCreationParams
(or StreamConfig) struct to bundle the multiple stream creation arguments and
refactor create_stream_if_not_exists, create_stream, and create_update_stream to
accept that struct instead of the long parameter list; update the call site in
create_stream_if_not_exists to build a StreamCreationParams (using Default for
sensible defaults) and pass it to create_stream, adjust create_stream signature
to destructure or reference the struct fields (stream_type, custom_partition,
log_source, telemetry_type, dataset_tags, dataset_labels), remove the
#[allow(clippy::too_many_arguments)] on those functions, and add
conversions/constructors where needed so existing call sites can migrate with
minimal changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 163-169: The labels vector built in the PUT handler (where
new_labels is created from body.into_inner().labels) must trim each label and
filter out empty or whitespace-only strings before deduplicating; update the
pipeline that builds new_labels to map each label through .trim(), filter out
strings that are empty after trimming, then collect into a HashSet to dedupe and
back into a Vec (preserving whatever ordering you need). Ensure you modify the
code around the new_labels construction so labels are normalized (trimmed) and
empty/whitespace-only entries are removed prior to deduplication.
- Around line 31-186: Handlers get_correlated_datasets, get_datasets_by_tag,
put_dataset_tags and put_dataset_labels currently use PARSEABLE.streams/list,
PARSEABLE.get_stream and storage APIs without tenant context; update each
handler to resolve the tenant from the request (e.g., call
get_tenant_id_from_request or read the normalized tenant header injected by
middleware), then scope all stream listing and lookups to that tenant (filter
list results or call tenant-aware APIs) and ensure any
storage.update_dataset_tags_and_labels_in_stream and PARSEABLE.get_stream calls
include/are called with the resolved tenant or have the tenant header
overwritten server-side so clients cannot spoof it; ensure the same tenant
resolution is applied to both read (listing/get) and write
(put_dataset_tags/put_dataset_labels) flows.

---

Nitpick comments:
In `@src/parseable/mod.rs`:
- Around line 480-527: Create a StreamCreationParams (or StreamConfig) struct to
bundle the multiple stream creation arguments and refactor
create_stream_if_not_exists, create_stream, and create_update_stream to accept
that struct instead of the long parameter list; update the call site in
create_stream_if_not_exists to build a StreamCreationParams (using Default for
sensible defaults) and pass it to create_stream, adjust create_stream signature
to destructure or reference the struct fields (stream_type, custom_partition,
log_source, telemetry_type, dataset_tags, dataset_labels), remove the
#[allow(clippy::too_many_arguments)] on those functions, and add
conversions/constructors where needed so existing call sites can migrate with
minimal changes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 139-151: The current read-modify-write in put_dataset_tags uses
PARSEABLE.get_stream(...), calls stream.get_dataset_labels(), then
storage.update_dataset_tags_and_labels_in_stream(...) and finally
stream.set_dataset_tags(...), which allows TOCTOU race with concurrent
put_dataset_labels; fix by serializing updates per-stream (or by providing a
combined tags+labels atomic endpoint). Add a per-stream mutex/async lock on the
stream (e.g., a metadata_lock() or similar on the stream returned by
PARSEABLE.get_stream) and acquire it around the read-modify-write sequence that
uses stream.get_dataset_labels,
storage.update_dataset_tags_and_labels_in_stream, and stream.set_dataset_tags so
concurrent tag/label updates for the same stream are executed sequentially.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 41-88: get_correlated_datasets currently iterates the global
PARSEABLE.streams.list() and PARSEABLE.get_stream(...) without restricting to
the tenant, causing cross-tenant exposure; update the handler to derive the
tenant context (e.g., from request auth/headers or a tenant path param) and then
restrict both the initial stream lookup (PARSEABLE.get_stream(&dataset_name))
and the loop over streams to the same tenant: either call a tenant-scoped
listing API (e.g., PARSEABLE.streams.list_for_tenant(tenant) if available) or
filter the results of PARSEABLE.streams.list() by a tenant identity exposed on
each stream (e.g., s.get_tenant() == tenant) before skipping and comparing;
ensure the check for the target dataset also validates it belongs to the tenant
so only tenant-scoped streams are considered.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

150-156: ⚠️ Potential issue | 🟡 Minor

Normalize label input in PUT body before dedupe.

Line 150-Line 156 deduplicates raw strings but does not trim/filter whitespace-only labels, so empty labels can still be persisted.

Suggested fix
     let final_labels = match body.labels {
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 150 - 156, Normalize and filter
incoming labels before deduplication: when building final_labels from
body.labels (the match arm handling Some(labels)), map each label to trimmed
string and filter out labels that are empty or whitespace-only (e.g.,
labels.into_iter().map(|s| s.trim().to_string()).filter(|s| !s.is_empty())...),
then collect into a HashSet to dedupe and back into the desired collection; keep
the None branch using stream.get_dataset_labels() unchanged.

142-172: ⚠️ Potential issue | 🔴 Critical

Concurrent partial metadata updates can still lose data.

Line 142-Line 172 does a read-modify-write using current in-memory values for absent fields, so two concurrent requests (tags-only and labels-only) can overwrite each other with stale counterparts.

Suggested direction
 pub async fn put_dataset_metadata(...) -> Result<HttpResponse, DatasetsError> {
     ...
     let stream = PARSEABLE
         .get_stream(&dataset_name, &tenant_id)
         .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
+
+    // Serialize metadata updates for this stream (or use atomic storage merge).
+    // let _guard = stream.metadata_lock().await;

     let final_tags = match body.tags { ... };
     let final_labels = match body.labels { ... };

     storage
         .update_dataset_tags_and_labels_in_stream(...)
         .await
         .map_err(DatasetsError::Storage)?;

     stream.set_dataset_tags(final_tags.clone());
     stream.set_dataset_labels(final_labels.clone());

If a stream-scoped lock is unavailable, prefer a storage-level atomic patch API that updates only provided fields server-side.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 142 - 172, Current code computes
final_tags/final_labels from in-memory stream values and then does a
read-modify-write, which lets concurrent requests overwrite each other; instead
either acquire the stream-scoped lock around the read-modify-write (use
stream.get_dataset_tags/get_dataset_labels and
stream.set_dataset_tags/set_dataset_labels inside the lock) or, preferably,
change the update to an atomic storage-side patch that accepts
Option<HashSet<_>> and only updates provided fields (modify the call to
storage.update_dataset_tags_and_labels_in_stream to pass Option types and
implement server-side merge), removing the reliance on in-memory reads to
prevent lost updates.
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

982-988: Prefer atomic update for tags+labels to avoid transient mixed state.

Line 982 and Line 986 update related metadata in two separate writes. A single setter that updates both fields under one lock would avoid short-lived inconsistent reads.

♻️ Suggested refactor
+    pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) {
+        let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
+        metadata.dataset_tags = tags;
+        metadata.dataset_labels = labels;
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 982 - 988, The two setters
set_dataset_tags and set_dataset_labels perform separate metadata.write() calls
causing transient inconsistent state; add a new atomic setter (e.g.,
set_dataset_tags_and_labels(&self, tags: Vec<DatasetTag>, labels: Vec<String>))
that acquires metadata.write() once and assigns both dataset_tags and
dataset_labels inside the same lock, then update call sites to use the new
method (or have the existing setters delegate to it) so modifications never
occur in two separate writes; reference the metadata RwLock field and the
existing set_dataset_tags/set_dataset_labels method names when making the
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/server.rs`:
- Around line 222-227: The route currently mounts only PUT /{name} using
http::datasets::put_dataset_metadata (authorized with Action::CreateStream), but
the API requires distinct replacement endpoints for tags and labels; add two
separate routes for "/{name}/tags" and "/{name}/labels" each using web::put()
and wired to the appropriate handlers (e.g., http::datasets::put_dataset_tags
and http::datasets::put_dataset_labels) with the same
authorize_for_resource(Action::CreateStream) policy, or if those handlers do not
exist, split http::datasets::put_dataset_metadata into two functions and route
them accordingly so clients can call PUT /datasets/{name}/tags and PUT
/datasets/{name}/labels.

In `@src/handlers/mod.rs`:
- Around line 95-114: The TryFrom<&str> for DatasetTag currently only accepts
the new literals (e.g., "agent-monitoring", "k8s-monitoring") causing older
values to fail; update the DatasetTag enum and its impl TryFrom to accept legacy
names by adding serde aliases on the enum variants (e.g., add #[serde(alias =
"agent-observability")] and #[serde(alias = "k8s-observability")] to
DatasetTag::AgentMonitoring and ::K8sMonitoring) and extend the match in impl
TryFrom (the function try_from) to also match the old strings
("agent-observability", "k8s-observability") mapping them to the corresponding
variants, keeping the existing error message unchanged.

In `@src/storage/mod.rs`:
- Around line 134-137: The struct currently only reads dataset_tags so legacy
single-key dataset_tag values are dropped; update deserialization/migration to
map dataset_tag -> dataset_tags by adding a compatibility conversion: in
src/storage/mod.rs ensure deserialization recognizes a legacy dataset_tag (alias
or custom Deserialize) and appends it into the dataset_tags Vec, or add a
migration step that, when bumping
CURRENT_OBJECT_STORE_VERSION/CURRENT_SCHEMA_VERSION, transforms any legacy
dataset_tag value into dataset_tags before validation; target the symbols
dataset_tags, dataset_tag, CURRENT_OBJECT_STORE_VERSION and
CURRENT_SCHEMA_VERSION when making this change so older metadata is preserved
and participates in tag-based APIs.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 150-156: Normalize and filter incoming labels before
deduplication: when building final_labels from body.labels (the match arm
handling Some(labels)), map each label to trimmed string and filter out labels
that are empty or whitespace-only (e.g., labels.into_iter().map(|s|
s.trim().to_string()).filter(|s| !s.is_empty())...), then collect into a HashSet
to dedupe and back into the desired collection; keep the None branch using
stream.get_dataset_labels() unchanged.
- Around line 142-172: Current code computes final_tags/final_labels from
in-memory stream values and then does a read-modify-write, which lets concurrent
requests overwrite each other; instead either acquire the stream-scoped lock
around the read-modify-write (use stream.get_dataset_tags/get_dataset_labels and
stream.set_dataset_tags/set_dataset_labels inside the lock) or, preferably,
change the update to an atomic storage-side patch that accepts
Option<HashSet<_>> and only updates provided fields (modify the call to
storage.update_dataset_tags_and_labels_in_stream to pass Option types and
implement server-side merge), removing the reliance on in-memory reads to
prevent lost updates.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-988: The two setters set_dataset_tags and set_dataset_labels
perform separate metadata.write() calls causing transient inconsistent state;
add a new atomic setter (e.g., set_dataset_tags_and_labels(&self, tags:
Vec<DatasetTag>, labels: Vec<String>)) that acquires metadata.write() once and
assigns both dataset_tags and dataset_labels inside the same lock, then update
call sites to use the new method (or have the existing setters delegate to it)
so modifications never occur in two separate writes; reference the metadata
RwLock field and the existing set_dataset_tags/set_dataset_labels method names
when making the change.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 419f53d and 4a3e23c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (15)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/handlers/http/ingest.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/server.rs`:
- Around line 210-220: The two route registrations conflict because
"/tags/{tag}" will match "/tags/correlated" before "/{name}/correlated", causing
get_correlated_datasets to be unreachable for a dataset named "tags"; change the
correlated endpoint path to avoid structural collision (for example replace
"/{name}/correlated" with "/correlated/{name}" in the route registration where
web::get().to(http::datasets::get_correlated_datasets).authorize_for_resource(Action::GetStreamInfo)
is set) and update the path extractor in the get_correlated_datasets handler and
any clients/docs referencing the old path so they use the new
"/correlated/{name}" form.
- Around line 200-227: The post_datasets handler currently derives tenant from
BasicAuth via extract_session_key_from_req; change it to read the tenant using
get_tenant_id_from_request (the same middleware-normalized header used by
get_info and the other dataset handlers) to enforce tenant scoping and prevent
header spoofing: locate the post_datasets function in http::prism_logstream,
remove or stop using extract_session_key_from_req for tenant resolution, call
get_tenant_id_from_request(req) (or equivalent helper used elsewhere) and pass
that tenant downstream to any dataset creation/validation logic so it matches
the behavior of get_datasets_by_tag, get_correlated_datasets, and
put_dataset_metadata.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a3e23c and 4a2b764.

📒 Files selected for processing (1)
  • src/handlers/http/modal/server.rs

PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags and
X-P-Dataset-Labels headers (comma-separated) on stream creation
PUT /api/prism/v1/datasets/{name} - update tags and labels
GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels
GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag
include tags and labels in home api response
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

197-202: ⚠️ Potential issue | 🟡 Minor

Normalize request-body labels before storing them.

This path still stores label bodies verbatim, so blank labels and whitespace variants can persist and skew correlation results.

Suggested fix
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 197 - 202, The current
final_labels assignment uses body.labels verbatim and can retain blank or
whitespace-only variants; update the transformation so that when handling
Some(labels) you trim each label (e.g., call trim on the string), discard empty
results (filter out labels where trimmed.is_empty()), and then deduplicate by
collecting into a HashSet before converting back to the Vec used by
final_labels; apply this change to the final_labels computation that consumes
body.labels so stored labels are normalized (trimmed and non-empty).

189-216: ⚠️ Potential issue | 🔴 Critical

Partial PUTs still have a lost-update race.

Combining tags and labels into one storage call does not make this atomic because omitted fields are still read from current in-memory state first. A concurrent {"tags": ...} request and {"labels": ...} request can each write back a stale copy of the other field and clobber one update. Either make PUT a full replacement that requires both arrays, or serialize this read/merge/write per stream.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 189 - 216, The current PUT
handler builds final_tags and final_labels by reading in-memory via
stream.get_dataset_tags()/get_dataset_labels() and then calls
storage.update_dataset_tags_and_labels_in_stream(...), which creates a
lost-update race when concurrent partial PUTs update only one field; to fix,
either require full replacement (validate that body.tags and body.labels are
both Some and reject partial PUTs) or serialize read/merge/write per stream
(introduce a per-stream mutex/lock when computing final_tags/final_labels and
calling update_dataset_tags_and_labels_in_stream) so concurrent handlers cannot
interleave; locate and modify the code that constructs final_tags/final_labels
and the call to update_dataset_tags_and_labels_in_stream to implement one of
these two strategies, using identifiers final_tags, final_labels,
stream.get_dataset_tags/get_dataset_labels, and
update_dataset_tags_and_labels_in_stream to find the right spot.
🧹 Nitpick comments (2)
src/handlers/mod.rs (2)

153-161: Same non-deterministic ordering concern for parse_dataset_labels.

Similar to parse_dataset_tags, the label ordering is non-deterministic after HashSet deduplication. This may affect API response consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 153 - 161, parse_dataset_labels currently
deduplicates via HashSet which yields non-deterministic ordering; change it to
produce a deterministic, stable order by using either a BTreeSet for
deduplication or collect into a Vec then sort before returning. Update the
function parse_dataset_labels (and mirror the same approach used for
parse_dataset_tags if present) to trim and filter as now, then dedupe
deterministically (BTreeSet::from_iter or dedupe Vec + sort) and return a
Vec<String> with a stable sorted order.

131-151: Non-deterministic ordering in parse_dataset_tags.

The function collects into a HashSet for deduplication, then converts to Vec. This results in non-deterministic ordering of the returned tags. If consistent ordering matters for API responses or comparison purposes, consider sorting or using IndexSet from the indexmap crate.

// Current: non-deterministic order
.collect::<HashSet<_>>()
.into_iter()
.collect()

// Alternative: deterministic order
.collect::<HashSet<_>>()
.into_iter()
.sorted() // requires Ord impl or sorted_by
.collect()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 131 - 151, parse_dataset_tags currently
deduplicates via a HashSet then collects into a Vec which yields
non-deterministic ordering; change it to produce a deterministic order by either
(a) collecting into the HashSet for dedupe and then sorting the iterator
(requiring DatasetTag: Ord or using a comparator) before collecting into Vec, or
(b) replace the HashSet with an IndexSet from the indexmap crate to preserve
insertion order; update the function parse_dataset_tags (and any uses)
accordingly to ensure stable, deterministic ordering of the returned
Vec<DatasetTag>.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 68-70: The 404 is returned too early because
PARSEABLE.get_stream(&dataset_name, &tenant_id) only checks the in-memory
registry; before mapping to DatasetsError::DatasetNotFound you must attempt the
same storage-hydration used by Parseable's logstream handlers to load the
dataset from object storage and populate the in-memory registry, then retry
get_stream; apply this change to the current occurrence and the similar one at
the later block (around lines 185-187) so that you only return
DatasetsError::DatasetNotFound if the stream is still absent after trying the
storage-load path.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 197-202: The current final_labels assignment uses body.labels
verbatim and can retain blank or whitespace-only variants; update the
transformation so that when handling Some(labels) you trim each label (e.g.,
call trim on the string), discard empty results (filter out labels where
trimmed.is_empty()), and then deduplicate by collecting into a HashSet before
converting back to the Vec used by final_labels; apply this change to the
final_labels computation that consumes body.labels so stored labels are
normalized (trimmed and non-empty).
- Around line 189-216: The current PUT handler builds final_tags and
final_labels by reading in-memory via
stream.get_dataset_tags()/get_dataset_labels() and then calls
storage.update_dataset_tags_and_labels_in_stream(...), which creates a
lost-update race when concurrent partial PUTs update only one field; to fix,
either require full replacement (validate that body.tags and body.labels are
both Some and reject partial PUTs) or serialize read/merge/write per stream
(introduce a per-stream mutex/lock when computing final_tags/final_labels and
calling update_dataset_tags_and_labels_in_stream) so concurrent handlers cannot
interleave; locate and modify the code that constructs final_tags/final_labels
and the call to update_dataset_tags_and_labels_in_stream to implement one of
these two strategies, using identifiers final_tags, final_labels,
stream.get_dataset_tags/get_dataset_labels, and
update_dataset_tags_and_labels_in_stream to find the right spot.

---

Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 153-161: parse_dataset_labels currently deduplicates via HashSet
which yields non-deterministic ordering; change it to produce a deterministic,
stable order by using either a BTreeSet for deduplication or collect into a Vec
then sort before returning. Update the function parse_dataset_labels (and mirror
the same approach used for parse_dataset_tags if present) to trim and filter as
now, then dedupe deterministically (BTreeSet::from_iter or dedupe Vec + sort)
and return a Vec<String> with a stable sorted order.
- Around line 131-151: parse_dataset_tags currently deduplicates via a HashSet
then collects into a Vec which yields non-deterministic ordering; change it to
produce a deterministic order by either (a) collecting into the HashSet for
dedupe and then sorting the iterator (requiring DatasetTag: Ord or using a
comparator) before collecting into Vec, or (b) replace the HashSet with an
IndexSet from the indexmap crate to preserve insertion order; update the
function parse_dataset_tags (and any uses) accordingly to ensure stable,
deterministic ordering of the returned Vec<DatasetTag>.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9baefa2e-2d8a-4e4b-8fe1-ab8ef9c04ab3

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/ingest.rs
  • src/parseable/streams.rs
  • src/handlers/http/modal/utils/logstream_utils.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/prism/home/mod.rs (1)

205-209: ⚠️ Potential issue | 🟠 Major

Pick a canonical metadata copy for tags and labels.

get_all_stream_jsons can yield multiple ObjectStoreFormat entries for one dataset, but this response copies dataset_tags and dataset_labels from stream_jsons[0] only. Since tags/labels are now mutable through PUT /datasets/{name}, the home API can return stale metadata depending on which copy the metastore returns first. Read an explicit authoritative stream.json here instead of the first entry.

Also applies to: 240-251

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/home/mod.rs` around lines 205 - 209, The code currently calls
PARSEABLE.metastore.get_all_stream_jsons and then copies
dataset_tags/dataset_labels from stream_jsons[0], which can be stale; update
get_stream_metadata to instead fetch the canonical/authoritative stream.json
(not rely on the first entry) and extract dataset_tags and dataset_labels from
that single authoritative result (e.g., use a metastore method that returns the
canonical stream.json or a get_stream_json/get_authoritative_stream_json
variant), and apply the same fix to the other block that currently reads
stream_jsons[0] (the second occurrence mentioned at lines 240-251) so both
places use the explicit authoritative stream.json when building
StreamMetadataResponse.
src/prism/logstream/mod.rs (1)

231-245: ⚠️ Potential issue | 🟠 Major

Enforce the full per-stream permission set before building PrismDatasetResponse.

This path eventually returns schema, stats, retention, and counts, but the only dataset-level gate in process_stream is Action::ListStream. A user who can list a dataset can therefore still receive data that should sit behind GetSchema/Query or other stronger actions. Validate the full action set per stream before calling build_dataset_response, or trim the response to only fields covered by the existing check.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/logstream/mod.rs` around lines 231 - 245, The code currently only
gates datasets by Action::ListStream in get_datasets/process_stream but still
builds full PrismDatasetResponse (schema, stats, retention, counts); fix by
enforcing per-stream permission checks for the full action set (e.g.,
Action::GetSchema, Action::Query, Action::GetRetention, Action::GetCounts, etc.)
before calling build_dataset_response — either (A) perform explicit permission
checks for those actions inside get_datasets (or have process_stream return the
set of allowed actions) and only include fields allowed by those actions, or (B)
change build_dataset_response to accept an allowed_actions flag and trim out
schema/stats/retention/counts when the caller lacks the corresponding
permission; apply this change around get_datasets, process_stream, and
build_dataset_response so responses never expose data beyond the allowed
per-stream actions.
♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

197-203: ⚠️ Potential issue | 🟡 Minor

Normalize labels before deduplication and persistence.

Line 197-203 dedupes labels but does not trim/filter empty values, so whitespace-only labels can still be stored and matched inconsistently.

💡 Proposed fix
-    let final_labels = match body.labels {
-        Some(labels) => labels
-            .into_iter()
-            .collect::<HashSet<_>>()
-            .into_iter()
-            .collect(),
+    let final_labels = match body.labels {
+        Some(labels) => labels
+            .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
+            .collect::<HashSet<_>>()
+            .into_iter()
+            .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 197 - 203, final_labels is
deduped but not normalized, allowing whitespace-only or untrimmed labels to
persist; update the handling of body.labels (and the branch that falls back to
stream.get_dataset_labels() if needed) to map each label through trim(), filter
out labels that are empty after trimming, then collect into a HashSet to
deduplicate and finally into the expected collection type used later (use the
same symbol final_labels). Ensure both the Some(labels) branch and any code path
that constructs labels from stream.get_dataset_labels() apply the same
trim+filter+dedupe normalization.

68-70: ⚠️ Potential issue | 🟠 Major

Defer DatasetNotFound until storage hydration is attempted.

Line 68-70 and Line 185-187 map in-memory get_stream misses straight to 404. In query/standalone flows, this can incorrectly 404 datasets that exist in object storage but are not hydrated yet. Please apply the same storage-hydration existence pattern used in logstream handlers before returning not found.

Based on learnings: stream existence checks should attempt storage-backed hydration in query/standalone mode before returning not found.

Also applies to: 185-187

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 68 - 70, The current code calls
PARSEABLE.get_stream(&dataset_name, &tenant_id) and immediately maps any error
to DatasetsError::DatasetNotFound (e.g., in the block around
PARSEABLE.get_stream and the similar check at lines 185-187), which causes a
premature 404 for datasets that exist in object storage but aren't hydrated;
instead, follow the logstream handler pattern: on get_stream error, attempt
storage-backed hydration (invoke the same hydrate/storage-check routine used by
logstream handlers for query/standalone flows) and only return
DatasetsError::DatasetNotFound if hydration confirms the dataset is absent or
hydration fails definitively; update the code paths around PARSEABLE.get_stream
and the duplicated check to defer mapping to DatasetNotFound until after the
hydration attempt completes.
🧹 Nitpick comments (2)
src/prism/home/mod.rs (1)

63-66: Keep tags and labels in the serialized shape even when empty.

With skip_serializing_if = "Vec::is_empty", datasets that simply have no metadata look the same as responses from servers that do not support these fields at all. Returning [] keeps the home-response schema stable for clients.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/home/mod.rs` around lines 63 - 66, The struct fields `tags:
Vec<DatasetTag>` and `labels: Vec<String>` in src/prism/home/mod.rs should
always be present in serialized output even when empty; remove the
`skip_serializing_if = "Vec::is_empty"` attribute while keeping
`#[serde(default)]` so empty vectors serialize as `[]`. Locate the `tags` and
`labels` field definitions in the struct (the lines with `tags: Vec<DatasetTag>`
and `labels: Vec<String>`) and delete only the `skip_serializing_if` portion of
their serde attributes, leaving `default` intact to still provide an empty Vec
when missing during deserialization.
src/parseable/streams.rs (1)

982-988: Add a single setter for dataset metadata.

The new dataset update flow has to call set_dataset_tags and set_dataset_labels separately, so readers can observe a mixed state between the two writes. A combined setter would keep the in-memory view coherent and avoid the second lock acquisition.

♻️ Suggested refactor
+    pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) {
+        let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
+        metadata.dataset_tags = tags;
+        metadata.dataset_labels = labels;
+    }
+
     pub fn set_dataset_tags(&self, tags: Vec<DatasetTag>) {
         self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags;
     }
 
     pub fn set_dataset_labels(&self, labels: Vec<String>) {
         self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels;
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 982 - 988, Introduce a single atomic
setter that updates both tags and labels under one write lock to avoid
intermediate inconsistent reads: add a new method (e.g. set_dataset_metadata or
set_dataset_tags_and_labels) that takes (tags: Vec<DatasetTag>, labels:
Vec<String>) and performs one self.metadata.write().expect(LOCK_EXPECT)
assignment setting both dataset_tags and dataset_labels in the same critical
section, and update call sites to use this combined setter instead of calling
set_dataset_tags and set_dataset_labels separately; keep the existing individual
setters only if backward compatibility is required but prefer the combined
setter to prevent two separate lock acquisitions and mixed-state visibility.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/prism/home/mod.rs`:
- Around line 205-209: The code currently calls
PARSEABLE.metastore.get_all_stream_jsons and then copies
dataset_tags/dataset_labels from stream_jsons[0], which can be stale; update
get_stream_metadata to instead fetch the canonical/authoritative stream.json
(not rely on the first entry) and extract dataset_tags and dataset_labels from
that single authoritative result (e.g., use a metastore method that returns the
canonical stream.json or a get_stream_json/get_authoritative_stream_json
variant), and apply the same fix to the other block that currently reads
stream_jsons[0] (the second occurrence mentioned at lines 240-251) so both
places use the explicit authoritative stream.json when building
StreamMetadataResponse.

In `@src/prism/logstream/mod.rs`:
- Around line 231-245: The code currently only gates datasets by
Action::ListStream in get_datasets/process_stream but still builds full
PrismDatasetResponse (schema, stats, retention, counts); fix by enforcing
per-stream permission checks for the full action set (e.g., Action::GetSchema,
Action::Query, Action::GetRetention, Action::GetCounts, etc.) before calling
build_dataset_response — either (A) perform explicit permission checks for those
actions inside get_datasets (or have process_stream return the set of allowed
actions) and only include fields allowed by those actions, or (B) change
build_dataset_response to accept an allowed_actions flag and trim out
schema/stats/retention/counts when the caller lacks the corresponding
permission; apply this change around get_datasets, process_stream, and
build_dataset_response so responses never expose data beyond the allowed
per-stream actions.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 197-203: final_labels is deduped but not normalized, allowing
whitespace-only or untrimmed labels to persist; update the handling of
body.labels (and the branch that falls back to stream.get_dataset_labels() if
needed) to map each label through trim(), filter out labels that are empty after
trimming, then collect into a HashSet to deduplicate and finally into the
expected collection type used later (use the same symbol final_labels). Ensure
both the Some(labels) branch and any code path that constructs labels from
stream.get_dataset_labels() apply the same trim+filter+dedupe normalization.
- Around line 68-70: The current code calls PARSEABLE.get_stream(&dataset_name,
&tenant_id) and immediately maps any error to DatasetsError::DatasetNotFound
(e.g., in the block around PARSEABLE.get_stream and the similar check at lines
185-187), which causes a premature 404 for datasets that exist in object storage
but aren't hydrated; instead, follow the logstream handler pattern: on
get_stream error, attempt storage-backed hydration (invoke the same
hydrate/storage-check routine used by logstream handlers for query/standalone
flows) and only return DatasetsError::DatasetNotFound if hydration confirms the
dataset is absent or hydration fails definitively; update the code paths around
PARSEABLE.get_stream and the duplicated check to defer mapping to
DatasetNotFound until after the hydration attempt completes.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-988: Introduce a single atomic setter that updates both tags
and labels under one write lock to avoid intermediate inconsistent reads: add a
new method (e.g. set_dataset_metadata or set_dataset_tags_and_labels) that takes
(tags: Vec<DatasetTag>, labels: Vec<String>) and performs one
self.metadata.write().expect(LOCK_EXPECT) assignment setting both dataset_tags
and dataset_labels in the same critical section, and update call sites to use
this combined setter instead of calling set_dataset_tags and set_dataset_labels
separately; keep the existing individual setters only if backward compatibility
is required but prefer the combined setter to prevent two separate lock
acquisitions and mixed-state visibility.

In `@src/prism/home/mod.rs`:
- Around line 63-66: The struct fields `tags: Vec<DatasetTag>` and `labels:
Vec<String>` in src/prism/home/mod.rs should always be present in serialized
output even when empty; remove the `skip_serializing_if = "Vec::is_empty"`
attribute while keeping `#[serde(default)]` so empty vectors serialize as `[]`.
Locate the `tags` and `labels` field definitions in the struct (the lines with
`tags: Vec<DatasetTag>` and `labels: Vec<String>`) and delete only the
`skip_serializing_if` portion of their serde attributes, leaving `default`
intact to still provide an empty Vec when missing during deserialization.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: e591f47e-21dd-4410-8761-13a3ec03a40b

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/migration/mod.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/ingest.rs
  • src/parseable/mod.rs
  • src/handlers/mod.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/handlers/mod.rs (1)

131-151: Non-deterministic ordering in parsed tags/labels.

The parse_dataset_tags and parse_dataset_labels functions collect into a HashSet for deduplication then convert to Vec. Since HashSet iteration order is non-deterministic, the resulting vector order may vary across calls with the same input. This could cause issues with:

  • Consistent API responses
  • Testing
  • Diff-based change detection

If ordering matters for any downstream use case (e.g., serialization stability), consider sorting the result or using IndexSet from the indexmap crate to preserve insertion order while deduplicating.

♻️ Optional fix to ensure deterministic ordering
 pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> {
-    header_value
+    let mut tags: Vec<DatasetTag> = header_value
         .split(',')
         .filter_map(|s| {
             let trimmed = s.trim();
             if trimmed.is_empty() {
                 None
             } else {
                 match DatasetTag::try_from(trimmed) {
                     Ok(tag) => Some(tag),
                     Err(err) => {
                         warn!("Invalid dataset tag '{trimmed}': {err}");
                         None
                     }
                 }
             }
         })
         .collect::<HashSet<_>>()
         .into_iter()
-        .collect()
+        .collect();
+    tags.sort_by(|a, b| a.to_string().cmp(&b.to_string()));
+    tags
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 131 - 151, The functions parse_dataset_tags
(and parse_dataset_labels) deduplicate via HashSet then collect into Vec, which
yields non-deterministic ordering; update these functions to produce
deterministic output by either (a) replacing the HashSet dedupe with an IndexSet
from the indexmap crate to preserve insertion order while removing duplicates,
or (b) after collecting into a Vec, sort the Vec deterministically (e.g.,
lexicographically by tag/label string or by a stable Ord implementation) before
returning; apply the same change to both parse_dataset_tags and
parse_dataset_labels so repeated calls with the same input yield a stable
ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 131-151: The functions parse_dataset_tags (and
parse_dataset_labels) deduplicate via HashSet then collect into Vec, which
yields non-deterministic ordering; update these functions to produce
deterministic output by either (a) replacing the HashSet dedupe with an IndexSet
from the indexmap crate to preserve insertion order while removing duplicates,
or (b) after collecting into a Vec, sort the Vec deterministically (e.g.,
lexicographically by tag/label string or by a stable Ord implementation) before
returning; apply the same change to both parse_dataset_tags and
parse_dataset_labels so repeated calls with the same input yield a stable
ordering.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9baefa2e-2d8a-4e4b-8fe1-ab8ef9c04ab3

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
✅ Files skipped from review due to trivial changes (2)
  • src/handlers/http/mod.rs
  • src/parseable/streams.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/connectors/kafka/processor.rs
  • src/storage/field_stats.rs
  • src/storage/object_storage.rs
  • src/handlers/http/modal/server.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

211-217: ⚠️ Potential issue | 🟡 Minor

Normalize labels before deduplication (trim + drop empty).

Current label handling can persist empty/whitespace-only labels and produce noisy correlation results.

Proposed fix
     let final_labels = match body.labels {
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 211 - 217, The label handling for
final_labels (when reading body.labels or falling back to
stream.get_dataset_labels()) must normalize values before deduplication: trim
whitespace from each label and drop any empty/whitespace-only strings, then
collect into a HashSet to dedupe and back into the final collection. Update the
Some(labels) branch (body.labels) to map each label through trim + to_string,
filter out empties, then dedupe; and apply the same normalization/filtering to
the fallback stream.get_dataset_labels() result so both paths produce cleaned,
non-empty, deduplicated labels.

203-218: ⚠️ Potential issue | 🔴 Critical

Partial-update merge is still vulnerable to lost updates under concurrency.

None => existing + single write means concurrent requests (one tags-only, one labels-only) can overwrite each other’s field with stale values. This is still a TOCTOU/lost-write bug.

Suggested direction

Serialize the read-merge-write path per stream (mutex/lock), or move merge semantics into storage as an atomic patch operation so omitted fields are preserved against the latest persisted state.

Also applies to: 220-233

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 203 - 218, The current
read-merge-write for final_tags/final_labels uses body.tags/body.labels with
fallback to stream.get_dataset_tags()/get_dataset_labels(), which allows TOCTOU
lost-updates under concurrent partial updates; to fix, serialize the
read-merge-write for a given stream (e.g., acquire a per-stream mutex/lock
around reading stream.get_dataset_*(), merging with body.tags/body.labels, and
writing) or implement an atomic patch in storage that merges omitted fields
server-side so updates to tags and labels cannot overwrite each other; update
the code paths that compute final_tags and final_labels (and the duplicated
logic at lines ~220-233) to use the chosen serialized/atomic approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 192-201: The handler put_dataset_metadata currently relies on
middleware that doesn't extract the `{name}` path param, allowing a
resource-level authorization bypass; add an explicit per-dataset RBAC check
inside put_dataset_metadata using the same pattern as get_correlated_datasets:
call Users.authorize (or the existing Users instance used in handlers) with
Action::CreateStream and a concrete resource context built from the dataset_name
and tenant_id (or equivalent Resource::Stream/StreamIdentifier used by your RBAC
layer) and return an authorization error if the call denies access;
alternatively, fix auth_resource_context to extract the `name` path parameter
before calling Users.authorize so context_resource is Some(name) for resource
checks.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 211-217: The label handling for final_labels (when reading
body.labels or falling back to stream.get_dataset_labels()) must normalize
values before deduplication: trim whitespace from each label and drop any
empty/whitespace-only strings, then collect into a HashSet to dedupe and back
into the final collection. Update the Some(labels) branch (body.labels) to map
each label through trim + to_string, filter out empties, then dedupe; and apply
the same normalization/filtering to the fallback stream.get_dataset_labels()
result so both paths produce cleaned, non-empty, deduplicated labels.
- Around line 203-218: The current read-merge-write for final_tags/final_labels
uses body.tags/body.labels with fallback to
stream.get_dataset_tags()/get_dataset_labels(), which allows TOCTOU lost-updates
under concurrent partial updates; to fix, serialize the read-merge-write for a
given stream (e.g., acquire a per-stream mutex/lock around reading
stream.get_dataset_*(), merging with body.tags/body.labels, and writing) or
implement an atomic patch in storage that merges omitted fields server-side so
updates to tags and labels cannot overwrite each other; update the code paths
that compute final_tags and final_labels (and the duplicated logic at lines
~220-233) to use the chosen serialized/atomic approach.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c0f20c84-897c-4668-a2bd-eff10b483cf9

📥 Commits

Reviewing files that changed from the base of the PR and between 6cab92e and 9fa8536.

📒 Files selected for processing (1)
  • src/handlers/http/datasets.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

213-240: ⚠️ Potential issue | 🔴 Critical

Partial PUTs can still clobber each other under concurrency.

Combining tags and labels into one endpoint removed the cross-endpoint race, but the None => stream.get_dataset_*() merge keeps a read-modify-write window here. If one request updates only tags while another updates only labels, both can read the same old state and the later write will restore one field to a stale value. Please serialize metadata updates per stream, or move the merge into a single atomic storage-side update/CAS.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 213 - 240, The current
read-modify-write uses body.tags/body.labels with
stream.get_dataset_tags()/stream.get_dataset_labels() then calls
update_dataset_tags_and_labels_in_stream, which allows concurrent partial-PUTs
to clobber each other; fix by moving the merge/compare-and-swap into the storage
layer (make update_dataset_tags_and_labels_in_stream perform an atomic
read-merge-CAS using tenant_id and dataset_name) or by adding per-stream
serialization around this handler (e.g., a per-stream mutex) so updates to
final_tags/final_labels cannot interleave; modify the storage method or
introduce a per-stream lock and remove the local stream.get_dataset_* merge to
ensure atomicity.

221-227: ⚠️ Potential issue | 🟡 Minor

Trim and drop blank labels before persisting.

This path still accepts "", whitespace-only labels, and " foo " as distinct labels, so correlation can diverge from the header-based creation flow.

Suggested fix
     let final_labels = match body.labels {
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 221 - 227, Normalize incoming
labels by trimming whitespace and dropping empty strings before collecting into
the final set: in the Some(labels) branch where final_labels is built, replace
the current collect pipeline with one that maps each label through
trim().to_string(), filters out empty strings (after trim), then collects into a
HashSet to deduplicate and back into the desired collection. Also ensure the
same normalization is applied if you use stream.get_dataset_labels() or any
other source so header-based and body-based flows match.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 213-240: The current read-modify-write uses body.tags/body.labels
with stream.get_dataset_tags()/stream.get_dataset_labels() then calls
update_dataset_tags_and_labels_in_stream, which allows concurrent partial-PUTs
to clobber each other; fix by moving the merge/compare-and-swap into the storage
layer (make update_dataset_tags_and_labels_in_stream perform an atomic
read-merge-CAS using tenant_id and dataset_name) or by adding per-stream
serialization around this handler (e.g., a per-stream mutex) so updates to
final_tags/final_labels cannot interleave; modify the storage method or
introduce a per-stream lock and remove the local stream.get_dataset_* merge to
ensure atomicity.
- Around line 221-227: Normalize incoming labels by trimming whitespace and
dropping empty strings before collecting into the final set: in the Some(labels)
branch where final_labels is built, replace the current collect pipeline with
one that maps each label through trim().to_string(), filters out empty strings
(after trim), then collects into a HashSet to deduplicate and back into the
desired collection. Also ensure the same normalization is applied if you use
stream.get_dataset_labels() or any other source so header-based and body-based
flows match.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c8ea088e-5c7e-4c3a-b5e1-e9cf1aee7f18

📥 Commits

Reviewing files that changed from the base of the PR and between 9fa8536 and c37a57d.

📒 Files selected for processing (1)
  • src/handlers/http/datasets.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant